iT邦幫忙

0

typescript 變 golang,教你怎麼把 golang 的chan select 用 typescript 實現

  • 分享至 

  • xImage
  •  

golang 的 chan select 實在太方便,其實任何提供了協程的語言都能很好且方便的支持 chan 和 select,因爲經常寫 typescript 腳本,於是我把這兩個組件實現到了一個 typescript,你可以直接使用我的庫來得到 chan 和 select,本文後續是實現代碼的分析,你也可以參照分析去任何支持協程的語言中把golang的特性發揚光大/images/emoticon/emoticon08.gif

chan 提供了什麼

首先我們要分析 chan 提供了哪些功能

  1. 可寫,如果當前不可寫則阻塞直到寫入成功
  2. 可讀,如果當前不可讀則阻塞直到讀取成功
  3. 讀寫緩存

所以我們不妨來單獨實現這些功能,最後將它們組合在一起就是完整的 chan 了

實現 Reader

首先來實現一個 Reader

class Reader {
    // 這個函數用來將值傳遞給 reader 讀取,意思是每當調用這個函數,正在阻塞讀取的 chan 就能得到值
    invoke(val: IteratorResult<any>);
    // 這個用來 關閉 reader,關閉後才能通知讀取的 協程 退出
    close() ;
}

有了Reader.invoke 就能通知阻塞讀取的chan返回了,那 Reader 還需要提供一個接口來讀取值爲此我專門定義了 class ReadValue 代表一個讀取的任務,我們直接看代碼說明

interface Connection {
    disconet(): void
}
class ReadValue {
    constructor(private readonly p: Reader, private readonly callback: ReadCallback) { }
    // 這個函數通知讀取成功,由 Reader 調用
    invoke(val: IteratorResult<any>) {
        // 這裏調用上層傳來的回調,通知上層(select 或者 chan)讀取到值了 
        this.callback(val)
    }
    // 這個函數從 reader 的讀取任務中撤銷註冊
    // 記得我們還要實現 select 嗎?select 等待到一個 chan 成功後需要撤銷對其它 case 的讀寫
    disconet() {
        this.p.disconet(this)
    }
}

ReadValue 代表了一個讀取任務(有誰調用了 chan 的 讀取),所以我們 Reader 實現就簡單了,每當上層代碼讀取 chan 我們就創建一個 ReadValue 把它加入到 Reader 的一個隊列中用於等待,當有值可讀了(Reader.invoke 被調用) 就找一個隨機的 ReadValue 去通知它讀取成功了

下面是 Reader 完整代碼

type ReadCallback = (val: IteratorResult<any>) => void

class Reader {
    // 記錄 是否被關閉
    private closed_ = false
    // 保存讀取的任務隊列 (<-chan)
    private vals = new Array<ReadValue>()
    get isEmpty(): boolean {
        return this.vals.length == 0
    }
    // 有值可讀了就調用這個函數
    invoke(val: IteratorResult<any>) {
        const vals = this.vals
        switch (vals.length) {
            case 0: // 如果沒有任務就拋出異常,實現正確的化永遠不會執行到 case 0
                throw errChannelReaderEmpty
            case 1: // 只有一個等待讀取的,就直接將 val 給它 並且,通知它成功
                vals.pop()!.invoke(val)
                return
        }
        // 如果多個讀取任務,就隨機選一個任務 通知它讀取成功
        const last = vals.length - 1
        const i = Math.floor(Math.random() * vals.length)
        if (i != last) { //swap to end // 把隨機選取的任務交互到數組最後面,因爲數組刪除最後一個元素比較快
            [vals[i], vals[last]] = [vals[last], vals[i]]
        }
        vals.pop()!.invoke(val)
    }
    // 關閉讀取
    close() {
        if (this.closed_) {
            return
        }
        this.closed_ = true
        const vals = this.vals
        if (vals.length != 0) { // 通知所有等待中的 chan,已經關閉了快點返回,永遠不會有新的值可以讀到了
            for (const val of vals) { 
                val.invoke(noResult) // 使用了一個 {done:true} 的 js 習慣用法來通知 迭代器 結束了,是的 你後續可以使用 js 的 await for of 來迭代 讀取這個 chan
            }
            vals.splice(0) // 將任務清空
        }
    }
    // 這裏創建一個讀取任務,讀取成功就調用 回調函數
    connect(callback: ReadCallback): ReadValue {
        const val = new ReadValue(this, callback)
        this.vals.push(val)
        return val
    }
    // 這裏撤銷註冊的任務,主要用於 select 的實現
    disconet(val: ReadValue) {
        const vals = this.vals
        for (let i = 0; i < vals.length; i++) {
            if (vals[i] == val) {
                vals.splice(i, 1)
                break
            }
        }
    }
}

Writer

Writer 的邏輯和 Reader 類似先看下最核心的定義

class Writer {
    // 調用這個就將一個寫入任務完成,正在執行寫入的 chan 就會返回寫入成功
    invoke() ;
    // 調用這個關閉寫入,golang 裏面會導致正在執行寫入的 chan 拋出 panic
    close();
}

下面來定義一個 WirteValue 來表示一個寫入任務

class WirteValue {
    constructor(private readonly p: Writer,
        private readonly callback: WriteCallback,
        private readonly reject: RejectCallback,
        public readonly value: any, // 這個屬性記錄了要寫入的值
    ) { }
    // 調用這個通知寫入成功
    invoke() {
        this.callback(true)
    }
    // Writer 關閉時就調用這個,通知上層寫入任務失敗
    error() {
        const reject = this.reject
        if (reject) {
            try {
                reject(errChannelClosed)
            } catch (_) {
            }
        } else {
            this.callback(false)
        }
    }
    // 調用這個就撤銷寫入,同樣是爲了實現 select 準備的
    disconet() {
        this.p.disconet(this)
    }
}

WirteValue 代表了一個寫入任務(有誰調用了 chan 的 寫入),每當上層代碼寫入 chan 我們就創建一個 WirteValue 把它加入到 Writer 的一個隊列中用於等待,當有值可寫了(Writer.invoke 被調用) 就找一個隨機的 WirteValue 去通知它寫入成功了

下面是 Writer 完整代碼

class Writer {
    private closed_ = false
    // 記錄註冊的寫入任務
    private vals = new Array<WirteValue>()
    get isEmpty(): boolean {
        return this.vals.length == 0
    }
    // 可寫了就調用這個函數,它就會找一個寫入任務來寫入
    invoke() {
        const vals = this.vals
        switch (vals.length) {
            case 0: // 沒有任務,如果實現正確,永遠不會執行到 case 0 
                throw errChannelWriterEmpty
            case 1: // 只有一個任務就直接讓它完成
                const p = vals.pop()!
                p.invoke()
                return p.value // 把要寫的值返回給調用者去寫入
        }
        // 有多個任務,就隨機選一個去完成
        const last = vals.length - 1
        const i = Math.floor(Math.random() * vals.length)
        if (i != last) { //swap to end //將完成的任務交互到數組末尾以便快速刪除
            [vals[i], vals[last]] = [vals[last], vals[i]]
        }
        const p = vals.pop()!
        p.invoke()
        return p.value // 把要寫的值返回給調用者去寫入
    }
    // 關閉
    close() {
        if (this.closed_) {
            return
        }
        this.closed_ = true
        const vals = this.vals
        if (vals.length != 0) {
            // 通知所有未完成的任務寫入失敗
            for (const val of vals) {
                val.error()
            }
            vals.splice(0)
        }
    }
    // 創建一個寫入任務
    connect(callback: WriteCallback, reject: RejectCallback, val: any): WirteValue {
        const result = new WirteValue(this, callback, reject, val)
        this.vals.push(result)
        return result
    }
    // 撤銷任務,爲實現 select 準備
    disconet(val: WirteValue) {
        const vals = this.vals
        for (let i = 0; i < vals.length; i++) {
            if (vals[i] == val) {
                vals.splice(i, 1)
                break
            }
        }
    }
}

實現 chan select

發現文章有點長,我將在下篇文章中分析如何先 chan 和 select 的


圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言